d30fff7c925243e77f11eed71bd1eea636dcc0bb,hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java,AggregationClient,getAvgArgs,#number[]#ColumnInterpreter#Scan#,286
Before Change
HTable table = null;
try {
table = new HTable(conf, tableName);
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
scan.getStopRow(),
new Batch.Call<AggregateProtocol, Pair<S, Long>>() {
@Override
public Pair<S, Long> call(AggregateProtocol instance)
throws IOException {
return instance.getAvg(ci, scan);
}
}, avgCallBack);
} finally {
if (table != null) {
table.close();
After Change
HTable table = null;
try {
table = new HTable(conf, tableName);
table.coprocessorService(AggregateService.class, scan.getStartRow(),
scan.getStopRow(),
new Batch.Call<AggregateService, Pair<S, Long>>() {
@Override
public Pair<S, Long> call(AggregateService instance)
throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getAvg(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
Pair<S,Long> pair = new Pair<S, Long>(null, 0L);
if (response.getFirstPartCount() == 0) {
return pair;
}
pair.setFirst(ci.parseResponseAsPromotedType(
getBytesFromResponse(response.getFirstPart(0))));
ByteBuffer bb = ByteBuffer.allocate(8).put(
getBytesFromResponse(response.getSecondPart()));
bb.rewind();
pair.setSecond(bb.getLong());
return pair;
}
}, avgCallBack);
} finally {
if (table != null) {
table.close();